{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "618d7e9e",
   "metadata": {},
   "source": [
    "# 병렬 노드 실행을 위한 분기 생성 방법\n",
    "\n",
    "![](image/langgraph-09.jpeg)\n",
    "\n",
    "노드의 병렬 실행은 전체 그래프 작업의 속도를 향상시키는 데 필수적입니다. `LangGraph`는 노드의 병렬 실행을 기본적으로 지원하며, 이는 그래프 기반 워크플로우의 성능을 크게 향상시킬 수 있습니다. \n",
    "\n",
    "이러한 병렬화는 **fan-out**과 **fan-in** 메커니즘을 통해 구현되며, 표준 엣지와 `conditional_edges` 를 활용합니다. "
   ]
  },
  {
   "cell_type": "markdown",
   "id": "eb058fa7",
   "metadata": {},
   "source": [
    "## 환경 설정"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7715e3bd",
   "metadata": {},
   "outputs": [],
   "source": [
    "# API 키를 환경변수로 관리하기 위한 설정 파일\n",
    "from dotenv import load_dotenv\n",
    "\n",
    "# API 키 정보 로드\n",
    "load_dotenv()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "97f60a62",
   "metadata": {},
   "outputs": [],
   "source": [
    "# LangSmith 추적을 설정합니다. https://smith.langchain.com\n",
    "# !pip install -qU langchain-teddynote\n",
    "from langchain_teddynote import logging\n",
    "\n",
    "# 프로젝트 이름을 입력합니다.\n",
    "logging.langsmith(\"CH17-LangGraph-Modules\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2a87109f",
   "metadata": {},
   "source": [
    "## 병렬 노드 fan-out 및 fan-in\n",
    "\n",
    "**fan-out / fan-in**\n",
    "\n",
    "병렬 처리에서 **fan-out**과 **fan-in**은 작업을 나누고 모으는 과정을 설명하는 개념입니다.\n",
    "\n",
    "- **Fan-out (확장)**: 큰 작업을 여러 작은 작업으로 쪼갭니다. 예를 들어, 피자를 만들 때 도우, 소스, 치즈 준비를 각각 별도로 할 수 있습니다. 이렇게 각각의 파트를 나눠 동시에 처리하는 것이 fan-out입니다.\n",
    "\n",
    "- **Fan-in (수집)**: 나뉜 작은 작업들을 다시 하나로 합칩니다. 피자에 준비된 재료들을 모두 올려 완성 피자를 만드는 과정처럼, 여러 작업이 끝나면 결과를 모아 최종 작업을 완성하는 것이 fan-in입니다.\n",
    "\n",
    "즉, **fan-out**으로 작업을 분산시키고, **fan-in**으로 결과를 합쳐 최종 결과를 얻는 흐름입니다.\n",
    "\n",
    "----\n",
    "\n",
    "이 예시에서는 `Node A`에서 `B and C`로 팬아웃하고 `D`로 팬인하는 과정을 보여줍니다. \n",
    "\n",
    "State에서는 `reducer(add)` 연산자를 지정합니다. 이는 State 내 특정 키의 기존 값을 단순히 덮어쓰는 대신 값들을 결합하거나 누적합니다. 리스트의 경우, 새로운 리스트를 기존 리스트와 연결하는 것을 의미합니다.\n",
    "\n",
    "LangGraph는 State의 특정 키에 대한 reducer 함수를 지정하기 위해 `Annotated` 타입을 사용합니다. 이는 타입 검사를 위해 원래 타입(`list`)을 유지하면서도, 타입 자체를 변경하지 않고 reducer 함수(`add`)를 타입에 첨부할 수 있게 합니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "0da53871",
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import Annotated, Any\n",
    "from typing_extensions import TypedDict\n",
    "from langgraph.graph import StateGraph, START, END\n",
    "from langgraph.graph.message import add_messages\n",
    "\n",
    "\n",
    "# 상태 정의(add_messages 리듀서 사용)\n",
    "class State(TypedDict):\n",
    "    aggregate: Annotated[list, add_messages]\n",
    "\n",
    "\n",
    "# 노드 값 반환 클래스\n",
    "class ReturnNodeValue:\n",
    "    # 초기화\n",
    "    def __init__(self, node_secret: str):\n",
    "        self._value = node_secret\n",
    "\n",
    "    # 호출시 상태 업데이트\n",
    "    def __call__(self, state: State) -> Any:\n",
    "        print(f\"Adding {self._value} to {state['aggregate']}\")\n",
    "        return {\"aggregate\": [self._value]}\n",
    "\n",
    "\n",
    "# 상태 그래프 초기화\n",
    "builder = StateGraph(State)\n",
    "\n",
    "# 노드 A부터 D까지 생성 및 값 할당\n",
    "builder.add_node(\"a\", ReturnNodeValue(\"I'm A\"))\n",
    "builder.add_edge(START, \"a\")\n",
    "builder.add_node(\"b\", ReturnNodeValue(\"I'm B\"))\n",
    "builder.add_node(\"c\", ReturnNodeValue(\"I'm C\"))\n",
    "builder.add_node(\"d\", ReturnNodeValue(\"I'm D\"))\n",
    "\n",
    "# 노드 연결\n",
    "builder.add_edge(\"a\", \"b\")\n",
    "builder.add_edge(\"a\", \"c\")\n",
    "builder.add_edge(\"b\", \"d\")\n",
    "builder.add_edge(\"c\", \"d\")\n",
    "builder.add_edge(\"d\", END)\n",
    "\n",
    "# 그래프 컴파일\n",
    "graph = builder.compile()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3e3ba6e8",
   "metadata": {},
   "source": [
    "그래프를 시각화 합니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "89b5086c",
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_teddynote.graphs import visualize_graph\n",
    "\n",
    "visualize_graph(graph)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6aff4a26",
   "metadata": {},
   "source": [
    "`reducer`를 통해 각 노드에 추가된 값들이 **누적**되는 것을 확인할 수 있습니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "64a87798",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 그래프 실행\n",
    "graph.invoke({\"aggregate\": []}, {\"configurable\": {\"thread_id\": \"foo\"}})"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3ccf5a22",
   "metadata": {},
   "source": [
    "### 병렬처리 중 예외 발생시 대응\n",
    "\n",
    "LangGraph는 \"superstep(여러 노드들이 처리되는 전체 프로세스 스텝)\" 내에서 노드를 실행하는데, 이는 병렬 분기가 동시에 실행되더라도 전체 superstep이 **트랜잭션** 방식으로 처리됨을 의미합니다. \n",
    "\n",
    "따라서, 이러한 분기 중 하나라도 예외가 발생하면, 상태에 대한 업데이트가 **전혀** 적용되지 않습니다(전체 superstep이 오류 처리됨).\n",
    "\n",
    "> superstep: 여러 노드들이 처리되는 전체 프로세스 스텝\n",
    "\n",
    "![](image/langgraph-09.jpeg)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "af5ad6dd",
   "metadata": {},
   "source": [
    "\n",
    "오류가 발생하기 쉬운 작업(예: 불안정한 API 호출 처리)이 있는 경우, LangGraph는 이를 해결하기 위한 두 가지 방법을 제공합니다.\n",
    "\n",
    "1. 노드 내에서 일반적인 Python 코드를 작성하여 예외를 포착하고 처리할 수 있습니다.\n",
    "2. **[retry_policy](https://langchain-ai.github.io/langgraph/reference/graphs/#langgraph.graph.graph.CompiledGraph.retry_policy)** 를 설정하여 특정 유형의 예외가 발생한 노드를 재시도하도록 그래프를 지시할 수 있습니다. 실패한 분기만 재시도되므로 불필요한 작업 수행에 대해 걱정할 필요가 없습니다.\n",
    "\n",
    "이러한 기능들을 통해 병렬 실행과 예외 처리를 완벽하게 제어할 수 있습니다."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f501c7df",
   "metadata": {},
   "source": [
    "## 추가 단계가 있는 병렬 노드의 fan-out 및 fan-in\n",
    "\n",
    "위의 예시에서는 각 경로가 단일 단계일 때의 `fan-out`과 `fan-in` 방법을 보여주었습니다. 하지만 하나의 경로에 여러 단계가 있다면 어떻게 될까요?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "0b0ab72a",
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import Annotated\n",
    "from typing_extensions import TypedDict\n",
    "from langgraph.graph import StateGraph\n",
    "from langgraph.graph.message import add_messages\n",
    "\n",
    "\n",
    "# 상태 정의(add_messages 리듀서 사용)\n",
    "class State(TypedDict):\n",
    "    aggregate: Annotated[list, add_messages]\n",
    "\n",
    "\n",
    "# 노드 값 반환 클래스\n",
    "class ReturnNodeValue:\n",
    "    # 초기화\n",
    "    def __init__(self, node_secret: str):\n",
    "        self._value = node_secret\n",
    "\n",
    "    # 호출시 상태 업데이트\n",
    "    def __call__(self, state: State) -> Any:\n",
    "        print(f\"Adding {self._value} to {state['aggregate']}\")\n",
    "        return {\"aggregate\": [self._value]}\n",
    "\n",
    "\n",
    "# 상태 그래프 초기화\n",
    "builder = StateGraph(State)\n",
    "\n",
    "# 노드 생성 및 연결\n",
    "builder.add_node(\"a\", ReturnNodeValue(\"I'm A\"))\n",
    "builder.add_edge(START, \"a\")\n",
    "builder.add_node(\"b1\", ReturnNodeValue(\"I'm B1\"))\n",
    "builder.add_node(\"b2\", ReturnNodeValue(\"I'm B2\"))\n",
    "builder.add_node(\"c\", ReturnNodeValue(\"I'm C\"))\n",
    "builder.add_node(\"d\", ReturnNodeValue(\"I'm D\"))\n",
    "builder.add_edge(\"a\", \"b1\")\n",
    "builder.add_edge(\"a\", \"c\")\n",
    "builder.add_edge(\"b1\", \"b2\")\n",
    "builder.add_edge([\"b2\", \"c\"], \"d\")\n",
    "builder.add_edge(\"d\", END)\n",
    "\n",
    "# 그래프 컴파일\n",
    "graph = builder.compile()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "abbfdf81",
   "metadata": {},
   "source": [
    "그래프를 시각화합니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b6abb4f4",
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_teddynote.graphs import visualize_graph\n",
    "\n",
    "visualize_graph(graph)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bcd2d8ad",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 빈 리스트를 사용한 그래프 집계 연산 실행, 모든 데이터에 대한 기본 집계 수행\n",
    "graph.invoke({\"aggregate\": []})"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8b35570b",
   "metadata": {},
   "source": [
    "## 조건부 분기(conditional branching)\n",
    "\n",
    "fan-out이 결정적이지 않은 경우, `add_conditional_edges` 를 직접 사용할 수 있습니다.\n",
    "\n",
    "조건부 분기 이후 연결될 알려진 \"sink\" 노드가 있는 경우, 조건부 엣지를 생성할 때 `then=\"실행할 노드명\"` 을 제공할 수 있습니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "id": "93354095",
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import Annotated, Sequence\n",
    "from typing_extensions import TypedDict\n",
    "from langgraph.graph import END, START, StateGraph\n",
    "\n",
    "\n",
    "# 상태 정의(add_messages 리듀서 사용)\n",
    "class State(TypedDict):\n",
    "    aggregate: Annotated[list, add_messages]\n",
    "    which: str\n",
    "\n",
    "\n",
    "# 노드별 고유 값을 반환하는 클래스\n",
    "class ReturnNodeValue:\n",
    "    def __init__(self, node_secret: str):\n",
    "        self._value = node_secret\n",
    "\n",
    "    def __call__(self, state: State) -> Any:\n",
    "        print(f\"Adding {self._value} to {state['aggregate']}\")\n",
    "        return {\"aggregate\": [self._value]}\n",
    "\n",
    "\n",
    "# 상태 그래프 초기화\n",
    "builder = StateGraph(State)\n",
    "builder.add_node(\"a\", ReturnNodeValue(\"I'm A\"))\n",
    "builder.add_edge(START, \"a\")\n",
    "builder.add_node(\"b\", ReturnNodeValue(\"I'm B\"))\n",
    "builder.add_node(\"c\", ReturnNodeValue(\"I'm C\"))\n",
    "builder.add_node(\"d\", ReturnNodeValue(\"I'm D\"))\n",
    "builder.add_node(\"e\", ReturnNodeValue(\"I'm E\"))\n",
    "\n",
    "\n",
    "# 상태의 'which' 값에 따른 조건부 라우팅 경로 결정 함수\n",
    "def route_bc_or_cd(state: State) -> Sequence[str]:\n",
    "    if state[\"which\"] == \"cd\":\n",
    "        return [\"c\", \"d\"]\n",
    "    return [\"b\", \"c\"]\n",
    "\n",
    "\n",
    "# 전체 병렬 처리할 노드 목록\n",
    "intermediates = [\"b\", \"c\", \"d\"]\n",
    "\n",
    "builder.add_conditional_edges(\n",
    "    \"a\",\n",
    "    route_bc_or_cd,\n",
    "    intermediates,\n",
    ")\n",
    "for node in intermediates:\n",
    "    builder.add_edge(node, \"e\")\n",
    "\n",
    "\n",
    "# 최종 노드 연결 및 그래프 컴파일\n",
    "builder.add_edge(\"e\", END)\n",
    "graph = builder.compile()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9f69fcdf",
   "metadata": {},
   "source": [
    "아래는 참고용 코드입니다. `then` 문법을 사용할 경우, `then=\"e\"` 를 추가하고 edge 연결을 추가하지 않아도 됩니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e718ebeb",
   "metadata": {},
   "outputs": [],
   "source": [
    "## then 문법 사용시\n",
    "# builder.add_conditional_edges(\n",
    "#     \"a\",\n",
    "#     route_bc_or_cd,\n",
    "#     intermediates,\n",
    "#     then=\"e\",\n",
    "# )"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3400d24c",
   "metadata": {},
   "source": [
    "그래프를 시각화합니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9ac61928",
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_teddynote.graphs import visualize_graph\n",
    "\n",
    "visualize_graph(graph)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6b961327",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 그래프 실행(which: bc 로 지정)\n",
    "graph.invoke({\"aggregate\": [], \"which\": \"bc\"})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e4877888",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 그래프 실행(which: cd 로 지정)\n",
    "graph.invoke({\"aggregate\": [], \"which\": \"cd\"})"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0c93638e",
   "metadata": {},
   "source": [
    "## fan-out 값의 신뢰도에 따른 정렬\n",
    "\n",
    "병렬로 펼쳐진 노드들은 하나의 \"**super-step**\"으로 실행됩니다. 각 super-step 에서 발생한 업데이트들은 해당 super-step 이 완료된 후 순차적으로 상태에 적용됩니다.\n",
    "\n",
    "병렬 super-step 에서 일관된 사전 정의된 업데이트 순서가 필요한 경우, 출력값을 식별 키와 함께 상태의 별도 필드에 기록한 다음, 팬아웃된 각 노드에서 집결 지점까지 일반 `edge` 를 추가하여 \"sink\" 노드에서 이들을 결합해야 합니다.\n",
    "\n",
    "예를 들어, 병렬 단계의 출력을 \"신뢰도\"에 따라 정렬하고자 하는 경우를 고려해보겠습니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "57e475ae",
   "metadata": {},
   "outputs": [],
   "source": [
    "from typing import Annotated, Sequence\n",
    "from typing_extensions import TypedDict\n",
    "from langgraph.graph import StateGraph\n",
    "from langgraph.graph.message import add_messages\n",
    "\n",
    "\n",
    "# 팬아웃 값들의 병합 로직 구현, 빈 리스트 처리 및 리스트 연결 수행\n",
    "def reduce_fanouts(left, right):\n",
    "    if left is None:\n",
    "        left = []\n",
    "    if not right:\n",
    "        # 덮어쓰기\n",
    "        return []\n",
    "    return left + right\n",
    "\n",
    "\n",
    "# 상태 관리를 위한 타입 정의, 집계 및 팬아웃 값 저장 구조 설정\n",
    "class State(TypedDict):\n",
    "    # add_messages 리듀서 사용\n",
    "    aggregate: Annotated[list, add_messages]\n",
    "    fanout_values: Annotated[list, reduce_fanouts]\n",
    "    which: str\n",
    "\n",
    "\n",
    "# 그래프 초기화\n",
    "builder = StateGraph(State)\n",
    "builder.add_node(\"a\", ReturnNodeValue(\"I'm A\"))\n",
    "builder.add_edge(START, \"a\")\n",
    "\n",
    "\n",
    "# 병렬 노드 값 반환 클래스\n",
    "class ParallelReturnNodeValue:\n",
    "    def __init__(\n",
    "        self,\n",
    "        node_secret: str,\n",
    "        reliability: float,\n",
    "    ):\n",
    "        self._value = node_secret\n",
    "        self._reliability = reliability\n",
    "\n",
    "    # 호출시 상태 업데이트\n",
    "    def __call__(self, state: State) -> Any:\n",
    "        print(f\"Adding {self._value} to {state['aggregate']} in parallel.\")\n",
    "        return {\n",
    "            \"fanout_values\": [\n",
    "                {\n",
    "                    \"value\": [self._value],\n",
    "                    \"reliability\": self._reliability,\n",
    "                }\n",
    "            ]\n",
    "        }\n",
    "\n",
    "\n",
    "# 신뢰도(reliability)가 다른 병렬 노드들 추가\n",
    "builder.add_node(\"b\", ParallelReturnNodeValue(\"I'm B\", reliability=0.1))\n",
    "builder.add_node(\"c\", ParallelReturnNodeValue(\"I'm C\", reliability=0.9))\n",
    "builder.add_node(\"d\", ParallelReturnNodeValue(\"I'm D\", reliability=0.5))\n",
    "\n",
    "\n",
    "# 팬아웃 값들을 신뢰도 기준으로 정렬하고 최종 집계 수행\n",
    "def aggregate_fanout_values(state: State) -> Any:\n",
    "    # 신뢰도 기준 정렬\n",
    "    ranked_values = sorted(\n",
    "        state[\"fanout_values\"], key=lambda x: x[\"reliability\"], reverse=True\n",
    "    )\n",
    "    print(ranked_values)\n",
    "    return {\n",
    "        \"aggregate\": [x[\"value\"][0] for x in ranked_values] + [\"I'm E\"],\n",
    "        \"fanout_values\": [],\n",
    "    }\n",
    "\n",
    "\n",
    "# 집계 노드 추가\n",
    "builder.add_node(\"e\", aggregate_fanout_values)\n",
    "\n",
    "\n",
    "# 상태에 따른 조건부 라우팅 로직 구현\n",
    "def route_bc_or_cd(state: State) -> Sequence[str]:\n",
    "    if state[\"which\"] == \"cd\":\n",
    "        return [\"c\", \"d\"]\n",
    "    return [\"b\", \"c\"]\n",
    "\n",
    "\n",
    "# 중간 노드들 설정 및 조건부 엣지 추가\n",
    "intermediates = [\"b\", \"c\", \"d\"]\n",
    "builder.add_conditional_edges(\"a\", route_bc_or_cd, intermediates)\n",
    "\n",
    "# 중간 노드들과 최종 집계 노드 연결\n",
    "for node in intermediates:\n",
    "    builder.add_edge(node, \"e\")\n",
    "\n",
    "# 그래프 완성을 위한 최종\n",
    "graph = builder.compile()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4a4645d3",
   "metadata": {},
   "source": [
    "그래프를 시각화합니다."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "61d29bec",
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_teddynote.graphs import visualize_graph\n",
    "\n",
    "visualize_graph(graph)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "086416d2",
   "metadata": {},
   "source": [
    "병렬로 노드 실행시 결과를 reliability 기준으로 정렬합니다.\n",
    "\n",
    "**참고**\n",
    "\n",
    "- `b`: reliability=0.1\n",
    "- `c`: reliability=0.9\n",
    "- `d`: reliability=0.5"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bff43203",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 그래프 실행(which: bc 로 지정)\n",
    "graph.invoke({\"aggregate\": [], \"which\": \"bc\", \"fanout_values\": []})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9e2fe2fb",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 그래프 실행(which: cd 로 지정)\n",
    "graph.invoke({\"aggregate\": [], \"which\": \"cd\"})"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "langchain-kr-lwwSZlnu-py3.11",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
